package com.data.mall.kafka;

import com.alibaba.fastjson.JSON;
import com.data.mall.common.KafkaTrackingDTO;
import com.data.mall.domain.StockLog;
import com.data.mall.exception.BusinessException;
import com.data.mall.repositry.StockLogRepository;
import com.data.mall.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author shixukai
 * @Package com.data.mall.common
 * @Description: TODO
 * @date 2021/4/29
 */

@Service
public class KafkaProducerServiceImpl implements KafkaProduceService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    private OrderService orderService;
    @Autowired
    private StockLogRepository stockLogRepository;

    @Override
    public boolean transactionAsyncReduceStock(Long userId, Long itemId, Long promoId, Integer amount, String stockLogId) {
        Map<String,Object> bodyMap = new HashMap<>();
        bodyMap.put("itemId",itemId);
        bodyMap.put("amount",amount);
        bodyMap.put("stockLogId",stockLogId);

        Map<String,Object> argsMap = new HashMap<>();
        argsMap.put("itemId",itemId);
        argsMap.put("amount",amount);
        argsMap.put("userId",userId);
        argsMap.put("promoId",promoId);
        argsMap.put("stockLogId",stockLogId);

        String normalMessage = JSON.toJSONString(bodyMap);

        AtomicBoolean isStatus = new AtomicBoolean(false);

        kafkaTemplate.send("service",normalMessage).addCallback(success ->{
            try {
                orderService.createOrder(userId,itemId,promoId,amount,stockLogId);
            } catch (BusinessException e) {
                e.printStackTrace();
                //设置对应的stockLog为回滚状态
                StockLog stockLogDO = stockLogRepository.findByStockLogId(stockLogId);
                stockLogDO.setStatus(3);
                stockLogRepository.save(stockLogDO);
                isStatus.set(false);
            }
            isStatus.set(true);
        },failure ->{
            isStatus.set(false);
        });

        return isStatus.get();
    }

    @Override
    public void sendTrackingMessage(KafkaTrackingDTO object){
        String normalMessage = JSON.toJSONString(object);
        kafkaTemplate.send("tracking",normalMessage);
    }

}
